Dataformワークフローをシェルスクリプトでポーリングしながら動かす

Dataformワークフローをシェルスクリプトでポーリングしながら動かす

Dataformワークフローをシェルスクリプトで実行するときに、ポーリングするようにしました。ポーリングして実行できることで、複数回Dataformワークフローを実行する時に前回の実行が完了してから後続を実行したい場合にも対応できます。
Clock Icon2024.06.20

やりたいこと

Dataformワークフローを起動した後、実行状態をポーリングして完了したら次のDataformワークフローを動かすということがやりたかったです。
というのも、以前作成したシェルスクリプトはどんどんDataformワークフローを起動していきますが、Dataformワークフロー実行APIは非同期APIとなり実行後は実行状態を返してくるだけで完了したかどうかは実行状態を問い合わせるAPIを打って確認しないとわかりません。

一つのワークフローの実行が完了したら次のワークフローを実行する、ということを自動化したかったのでワークフローの実行状態をポーリングして、ワークフロー実行完了が返ってきたら次のワークフローを実行するというシェルスクリプトを実装しました。前日データ作成完了後に次の日のデータを作成する必要がある、というユースケースの時にこのシェルスクリプトは向いています。
また、開始日、終了日を引数にしていますが配列でコンパイル変数の内容を持たせてそれを反復処理する、なんていうことにも少し手を入れれば使えると思います。

処理の流れ

シェルスクリプト全文は以下になります。

#!/bin/bash
# 開始日と終了日を指定(yyyy-mm-dd形式)
START_DATE="2023-01-01"
END_DATE="2023-02-10"
PROJECT_ID="プロジェクトID"
REPO_NAME="リポジトリ名"
POLLING_INTERVAL=30
# タイムアウトの上限(秒)
TIMEOUT_LIMIT=1800 
# 日付フォーマット変換関数
format_date() {
  date -d "$1" +"%Y%m%d"
}
# 開始日と終了日をyyyymmdd形式に変換
start_date=$(format_date "$START_DATE")
end_date=$(format_date "$END_DATE")
# 開始日から終了日までの期間をループ
current_date="$START_DATE"
while [ "$(format_date "$current_date")" -le "$end_date" ]; do
  # 日付をyyyymmdd形式で出力
  yyyymmdd_val=$(format_date "$current_date")
  yyyymmdd="\'${yyyymmdd_val}\'"
  echo "Processing date: $yyyymmdd"

  # Compilation resultの作成リクエスト
  comp_result_response=$(curl -X POST "https://dataform.googleapis.com/v1beta1/projects/$PROJECT_ID/locations/asia-northeast1/repositories/$REPO_NAME/compilationResults" \
    --header "Authorization: Bearer $(gcloud auth print-access-token)" \
    --header "Content-Type: application/json" \
    --data "{\"gitCommitish\":\"main\", \"codeCompilationConfig\":{\"vars\":{\"compile_val\":\"$yyyymmdd\"}}}")

  # レスポンスからcompilationResultsのname値を抽出
  comp_result_name=$(echo "$comp_result_response" | jq -r '.name')
  echo "Compilation Result Name: $comp_result_name"

  # Workflow invocationの作成リクエスト
  workflow_invocation_response=$(curl -X POST "https://dataform.googleapis.com/v1beta1/projects/$PROJECT_ID/locations/asia-northeast1/repositories/$REPO_NAME/workflowInvocations" \
    --header "Authorization: Bearer $(gcloud auth print-access-token)" \
    --header "Content-Type: application/json" \
    --data "{\"compilationResult\":\"$comp_result_name\"}")

  workflow_invocation_name=$(echo "$workflow_invocation_response" | jq -r '.name')
  echo "Workflow Invocation Name: $workflow_invocation_name"

  # ワークフローの状態をポーリング
  start_time=$(date +%s)
  while true; do
    status_response=$(curl -X GET "https://dataform.googleapis.com/v1beta1/$workflow_invocation_name" \
      --header "Authorization: Bearer $(gcloud auth print-access-token)" \
      --header "Content-Type: application/json")
    workflow_state=$(echo "$status_response" | jq -r '.state')
    echo "Current Workflow State: $workflow_state"
    if [[ "$workflow_state" == "SUCCEEDED" ]]; then
      echo "Workflow succeeded."
      break
    elif [[ "$workflow_state" == "FAILED" ]]; then
      echo "Workflow failed."
      exit 1
    else
      current_time=$(date +%s)
      elapsed_time=$((current_time - start_time))
      if [[ $elapsed_time -ge $TIMEOUT_LIMIT ]]; then
        echo "Workflow polling timed out after $elapsed_time seconds."
        exit 1
      fi
      echo "Workflow still in progress. Current state: $workflow_state. Waiting..."
      sleep $POLLING_INTERVAL
    fi
  done
  current_date=$(date -I -d "$current_date + 1 day")
done

ポーリングを行なっている以下の箇所が、このシェルスクリプトの肝になるところです。

  while true; do
    status_response=$(curl -X GET "https://dataform.googleapis.com/v1beta1/$workflow_invocation_name" \
      --header "Authorization: Bearer $(gcloud auth print-access-token)" \
      --header "Content-Type: application/json")
    workflow_state=$(echo "$status_response" | jq -r '.state')
    echo "Current Workflow State: $workflow_state"
    if [[ "$workflow_state" == "SUCCEEDED" ]]; then
      echo "Workflow succeeded."
      break
    elif [[ "$workflow_state" == "FAILED" ]]; then
      echo "Workflow failed."
      exit 1

この処理があるので、実行したDataformワークフローが実行完了したら、次のコンパイル結果を再びDataformワークフローを実行することができます。
Dataformワークフローを実行したレスポンスのnameパラメタを引数に以下のAPIを叩くとDataformワークフローの実行状態(state)を取得することができます。

https://dataform.googleapis.com/v1beta1/$workflow_invocation_name"

Dataformワークフローの実行状態は以下のstateのどれかになります。
今回の実装では、SUCCEEDEDだったら後続処理実行、FAILEDだったらシェルスクリプト自体を終了するようにしています。

state 概要
STATE_UNSPECIFIED 初期値。使用されることはない
RUNNING ワークフローが実行中
SUCCEEDED 成功
CANCELLED キャンセル
FAILED 失敗
CANCELING キャンセル中

またPOLLING_INTERVAL変数でポーリング間隔を調整することができます。
Dataformの実行結果がFAILEDSUCCEEDED以外になってしまった場合に無限ループしないようにポーリングのタイムアウト時間もTIMEOUT_LIMITで設定をしております。適宜ワークロードに則って値を調整すると良いと考えます。

まとめ

前回作成したシェルスクリプトはポーリング処理をしないでとにかく指定日付分Dataformワークフローを実行するというものでした。
DataformをAPIで実行する場合、前後関係がある実行をしたい場合は今回のようなポーリングをしてあげる必要があります。
この記事がどなたかのお役に立てば嬉しいです。それではまた。

使用しているAPI

コンパイルAPI
ワークフロー実行API
実行ワークフロー取得API

この記事をシェアする

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.